package com.atguigu.flink.watermark;

import com.alibaba.fastjson.JSON;
import com.atguigu.flink.function.MyWordCountFlatmapFunction;
import com.atguigu.flink.pojo.WaterSensor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * Created by Smexy on 2023/11/7
 *

    从源头产生水印

 */
public class Demo4_SourceGenerateWaterMark
{
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

        FileSource<String> source = FileSource.forRecordStreamFormat(
                                                 new TextLineInputFormat(),
                                                  new Path("data/ws.json"))
                                             .build();

         WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy
                     .<String>forMonotonousTimestamps()
                     .withTimestampAssigner(new SerializableTimestampAssigner<String>()
                     {
                         @Override
                         public long extractTimestamp(String element, long recordTimestamp) {
                             return JSON.parseObject(element).getLong("ts");
                         }
                     });

        environment
            //用source读数据时，直接生成水印
            .fromSource(source, watermarkStrategy, "wc")
            .print();

        environment.execute();



    }
}
